Project Description¶
In this project, we take on the role of a data analyst at an energy company. Previously, analysts from other departments had to manually collect, clean, and prepare data every quarter to track changes in electricity sales and generation capabilities. This manual process was time-consuming and often frustrating for the teams involved.
Our goal is to automate this workflow by creating a data pipeline that retrieves and processes data every month. This pipeline will make it easier and faster for the company to gain insights from the data and allow analysts to spend more time on important tasks like analysis and decision-making rather than data preparation.
We will work with two raw data files:
electricity_sales.csv
electricity_capability_nested.json
The electricity_sales.csv
file contains detailed records of electricity sales. Below is a data dictionary that describes the structure of this CSV file:
Field | Data Type |
---|---|
period | str |
stateid | str |
stateDescription | str |
sectorid | str |
sectorName | str |
price | float |
price-units | str |
We will write Python code to build and test this automated pipeline. It will include steps for reading, transforming, and cleaning both datasets. Once completed, the pipeline will help deliver regular, clean, and structured data to support quicker and more accurate energy insights across the organization.
Question No 1: Define an extract_tabular_data()
function to ingest tabular data. This function will take a single parameter, file_path
. If file_path
ends with .csv, use the pd.read_csv()
function to extract the data. If file_path
ends with .parquet, use the pd.read_parquet()
function to extract the data.¶
import pandas as pd
import json
def extract_tabular_data(file_path: str):
"""Extract data from a tabular file_format, with pandas."""
if file_path.endswith(".csv"):
return pd.read_csv(file_path)
elif file_path.endswith(".parquet"):
return pd.read_parquet(file_path)
else:
raise Exception("Warning: Invalid file extension. Please try with .csv or .parquet!")
Question No 2: Create another function with the name extract_json_data()
, which takes a file_path
. Use the json_normalize()
function from the pandas library to flatten the nested JSON data, and return a pandas DataFrame
.¶
def extract_json_data(file_path):
"""Extract and flatten data from a JSON file."""
# First, read in the JSON file into memory using the json library
with open(file_path, "r") as json_file:
raw_data = json.load(json_file)
return pd.json_normalize(raw_data)
Question No 3: Create a function called transform_electricity_sales_data()
which takes a single parameter raw_data
. raw_data
should be of type pd.DataFrame
. The transform_electricity_sales_data()
needs to fullfil some requirements that are described below.¶
- Drop any records with NA values in the `price` column. Do this inplace.
- Only keep records with a `sectorName` of "residential" or "transportation".
- Create a `month` column using the first 4 characters of the values in `period`.
- Create a `year` column using the last 2 characters of the values in `period`.
- Return the transformed `DataFrame`, keeping only the columns `year`, `month`, `stateid`, `price` and `price-units`
def transform_electricity_sales_data(raw_data: pd.DataFrame):
"""
Transform electricity sales to find the total amount of electricity sold
in the residential and transportation sectors.
"""
# Drop any records with a null value
raw_data.dropna(subset=["price"], inplace=True)
# Only keep residential and transformation records
cleaned_df = raw_data.loc[raw_data["sectorName"].isin(["residential", "transportation"]), :]
# Create year and month columns
cleaned_df["year"] = cleaned_df["period"].str[0:4]
cleaned_df["month"] = cleaned_df["period"].str[5:]
# Only keep columns period, stateId, sector, value, units
cleaned_df = cleaned_df.loc[:, ["year", "month", "stateid", "price", "price-units"]]
return cleaned_df
Question No 4: Define a function called load()
, which takes a DataFrame
and a file_path
. If the file_path
ends with .csv
, load the DataFrame to a CSV file. If instead the file_path
ends with .parquet
, load the DataFrame to a Parquet file.¶
def load(dataframe: pd.DataFrame, file_path: str):
"""
Load a DataFrame to a file in either CSV or Parquet format.
"""
# Check to see if the file path ends with .csv or .parquet
if file_path.endswith(".csv"):
dataframe.to_csv(file_path)
elif file_path.endswith(".parquet"):
dataframe.to_parquet(file_path)
# Otherwise, throw an exception
else: raise Exception(f"Warning: {file_path} is not a valid file type. Please try again!")